Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support transactions #881

Closed
wants to merge 17 commits into from

Conversation

YordanPavlov
Copy link
Contributor

@YordanPavlov YordanPavlov commented Apr 1, 2021

A pull request to expose the transactions support of LibrdKafka.

Since I noticed that support for transactions has been requested in this issue:
#871
I have done some work to expose it, please consider this pull request @iradul

Replication factor of transaction related options should be reduced to 1
so that we can test with a single broker.
@YordanPavlov
Copy link
Contributor Author

I have struggled to find the best way to test the library specifying the git url in package.json. Seem in doing so the submodules are not present, do you have some guidelines on how to approach this.

src/producer.cc Outdated
return Baton(RdKafka::ERR_NO_ERROR);
}
else {
Baton result(error->code());
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The librdkafka transactional API uses the new richer rd_kafka_error_t type that provides some extra error attributes.
These attributes, fatal, txn_requires_abort and retriable, are necessary to expose in this node.js API as well since it tells the application how the errors are to be handled, which can not be derrived from the error code itself.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your feedback, I have addressed it. I am now exposing a richer JS object populated with data from rd_kafka_error_t.

Baton InitTransactions(int32_t timeout_ms);
Baton BeginTransaction();
Baton CommitTransaction(int32_t timeout_ms);
Baton AbortTransaction(int32_t timeout_ms);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SendOffsetsToTransaction() is also required

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not familiar with committing offsets, it is something I need to investigate. Do you think it is possible to merge this PR and leave that for future work?

* Expose the richer 'rd_kafka_error_t' object returned by Rdkafka to the
  JS code.
* Add a test to cover the generated error object.
@iradul
Copy link
Collaborator

iradul commented May 7, 2021

@YordanPavlov thanks for the PR! Also @edenhill thank you for reviewing this!

We'll need to do some more work for this to be merged.

Transactions are only valuable in consume-process-produce scenarios so as @edenhill mentioned SendOffsetsToTransaction is required and is a must.

I suggest you read through this Confluent page to get a better understanding.

Here are a few tasks that need to be addressed:

  • implement SendOffsetsToTransaction
  • e2e test must include a consumer with disabled auto commits and SendOffsetsToTransaction call
  • unless there is a reason not to, e2e tests should be merged into a single .spec.js file in e2e directory (no need for a new e2e_transactions directory and two specs)
  • ideally, all methods in javascript should be asynchronous
  • need to cover some docs on how to use this
  • need to add typescript definitions

…ent as argument but pointer is not transferred correctly to C++ land.
@YordanPavlov
Copy link
Contributor Author

Thanks @iradul for considering this PR, I have done some work on trying to add the SendOffsetsToTransaction method, however I have some difficulties on how to transfer the consumer object from JS to C++ land. I have created a separate PR for only this feature:
santiment#2
I would appreciate it if you are someone else can have a look and provide guidance.

@YordanPavlov
Copy link
Contributor Author

In the meantime I have addressed some of the other issues.

  • Added Typescript definitions.
  • Merged e2e transaction tests into e2e directory.

As for your comment 'ideally, all methods in javascript should be asynchronous' - as the C++ methods are synchronous I believe no callback is required in the JS code also.

@iradul
Copy link
Collaborator

iradul commented May 10, 2021

Thanks.

I would appreciate it if you or someone else can have a look and provide guidance.

I'll work with you on this one. We'll integrate the whole feature within this PR, no need to make a new PR. You can go ahead and cherry-pick santiment#2 commits to here.

@olegladyka
Copy link

Hi,
I was working on implementation of "SendOffsetsToTransaction" from your branch, and managed to get it work.
You can see here the changes: https://github.com/olegladyka/node-rdkafka/compare/d1005e..7df42b1
However in my implementation "SendOffsetsToTransaction" receives additional parameter - array of TopicPartition, providing the ability to commit any offset, not just the current consumer offset.

Instead of fetching all offsets for all partitions for a consumer - expect to get them
as argument on SendOffsetsToTransaction.
@YordanPavlov
Copy link
Contributor Author

Hi,
I was working on implementation of "SendOffsetsToTransaction" from your branch, and managed to get it work.
You can see here the changes: https://github.com/olegladyka/node-rdkafka/compare/d1005e..7df42b1
However in my implementation "SendOffsetsToTransaction" receives additional parameter - array of TopicPartition, providing the ability to commit any offset, not just the current consumer offset.

Thank you for your contribution @olegladyka. I have merged my helper PR (the one specific for SendOffsetsToTransaction) and also copied your code, so now everything is in the current PR, as @iradul requested. Unfortunatelly even with your changes, I still get a core dump at this point:

./node_modules/nan/nan_object_wrap.h:32: static T* Nan::ObjectWrap::Unwrap(v8::Local<v8::Object>) [with T = NodeKafka::KafkaConsumer]: Assertion object->InternalFieldCount() > 0' failed.`

this happens on running the test for SendOffsetsToTransaction. Could you @olegladyka try it for yourself. I may have done some mistake you mentioned that it worked in your tests.

@olegladyka
Copy link

olegladyka commented May 19, 2021

Hi,
I was working on implementation of "SendOffsetsToTransaction" from your branch, and managed to get it work.
You can see here the changes: https://github.com/olegladyka/node-rdkafka/compare/d1005e..7df42b1
However in my implementation "SendOffsetsToTransaction" receives additional parameter - array of TopicPartition, providing the ability to commit any offset, not just the current consumer offset.

Thank you for your contribution @olegladyka. I have merged my helper PR (the one specific for SendOffsetsToTransaction) and also copied your code, so now everything is in the current PR, as @iradul requested. Unfortunatelly even with your changes, I still get a core dump at this point:

./node_modules/nan/nan_object_wrap.h:32: static T* Nan::ObjectWrap::Unwrap(v8::Local<v8::Object>) [with T = NodeKafka::KafkaConsumer]: Assertion object->InternalFieldCount() > 0' failed.`

this happens on running the test for SendOffsetsToTransaction. Could you @olegladyka try it for yourself. I may have done some mistake you mentioned that it worked in your tests.

When you call producer.sendOffsetsToTransaction(consumer.position(), consumer, transactions_timeout_ms) in test, you need to use consumer.getClient() instead of consumer, like this:
producer.sendOffsetsToTransaction(consumer.position(), consumer.getClient(), transactions_timeout_ms)

@iradul
Copy link
Collaborator

iradul commented May 23, 2021

Great work guys!
@YordanPavlov, I have a few fixes to add. To push those to this PR, you'll have to check Allow edits from maintainers.. Could you please do this? Thanks.

@YordanPavlov
Copy link
Contributor Author

Thank you @olegladyka for pointing out the mistake on my end, indeed calling consumer.getClient() solved the problem. I did one more commit to tidy up the test.
@iradul I can't seem to find the Allow edits from maintainers checkbox, the PR is against upstream Master, could it be that you already have access.

@iradul
Copy link
Collaborator

iradul commented May 24, 2021

Hey @YordanPavlov, the checkbox should be on the right-hand side on this same page under Notifications group.
Anyway, I just pushed a few commits on pr881 branch, and you can cherry-pick them, for example.

Before I merge this, we need to work a bit on e2e tests. Actually, I think, for now, you can remove send_offset_transaction.spec.js and fix producer-transaction.spec.js to work with my latest commits after you include them.

@YordanPavlov
Copy link
Contributor Author

I cherry-picked your commits @iradul . Also added the small changes in the tests needed after the style changes. However I started getting some:

  1) Transactional Producer
       with consumer
         "after each" hook for "no message should be delivered if transaction is aborted":
     TypeError: Cannot read property 'call' of undefined
      at processImmediate (node:internal/timers:464:21)

errors on running the tests. I can't reproduce it running the tests one file by one but it happens when ran all together.

Some transaction end to end tests were failing for two reasons.
* A delayed message from prevoius test was being read and braking up the
  logic of the test. Fixed by using a separate topic.
* One more message was being produced than expected. Fixed.
@YordanPavlov
Copy link
Contributor Author

@iradul I dug into those occasional test fails and fixed them. Please have a look at my most recent commit. Let's continue work on merging this PR.

@iradul
Copy link
Collaborator

iradul commented Jun 25, 2021

We need to make those new APIs async since all methods can take a significant amount of time to complete.
I already have most of this code done, but since I can't push to your branch, I'm thinking of merging this as is and then add what's missing on top of that. Or you can keep cherry-picking commits from pr881.

Also, I'd like us to include a complete e2e transactional test, for instance, similar to this.

@YordanPavlov
Copy link
Contributor Author

YordanPavlov commented Jun 28, 2021

We need to make those new APIs async since all methods can take a significant amount of time to complete.
I already have most of this code done, but since I can't push to your branch, I'm thinking of merging this as is and then add what's missing on top of that. Or you can keep cherry-picking commits from pr881.

I have investigated why I am not seeing the option to grant you Edit access to the PR and the reason seems to be that the Fork I had originally done lives in an organization and not my personal namespace, so Github strips me of the ability to grant access. Any of the three options work for me though:

  1. I can keep cherry-picking commits.
  2. We can merge the commit and improve it further in another PR.
  3. I can create a brand new PR out of my personal GitHub namespace where I would be able to grant you edit rights, but I am afraid that we would loose the history of the discussion here.

So just let me know how you would prefer to continue.

@iradul
Copy link
Collaborator

iradul commented Jun 29, 2021

Great feedback. I like the third option. You can reference this PR in the opening comment, so all the history will be available and preserved.

@YordanPavlov
Copy link
Contributor Author

Sorry for the delay guys. Here is the new PR:
#903
I used the opportunity to do some cleaning of the commits. Commits by maintainer are now allowed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants